Skip to content

Python: Add Azure Cosmos history provider package#4271

Open
eavanvalkenburg wants to merge 2 commits intomicrosoft:mainfrom
eavanvalkenburg:cosmos_history_provider
Open

Python: Add Azure Cosmos history provider package#4271
eavanvalkenburg wants to merge 2 commits intomicrosoft:mainfrom
eavanvalkenburg:cosmos_history_provider

Conversation

@eavanvalkenburg
Copy link
Member

@eavanvalkenburg eavanvalkenburg commented Feb 25, 2026

Summary

  • add new Python package agent-framework-azure-cosmos
  • implement CosmosHistoryProvider for conversation history in Azure Cosmos DB
  • add batching for save/clear operations with execute_item_batch and add list_sessions
  • add package-local sample and documentation updates

Validation

  • uv run --directory packages/azure-cosmos poe test
  • uv run --directory packages/azure-cosmos poe lint
  • uv run --directory packages/azure-cosmos poe pyright
  • uv run --directory packages/azure-cosmos poe mypy

Fixes #1390

Copilot AI review requested due to automatic review settings February 25, 2026 19:21
@markwallace-microsoft markwallace-microsoft added documentation Improvements or additions to documentation python labels Feb 25, 2026
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a new Python workspace package (agent-framework-azure-cosmos) that implements an Azure Cosmos DB–backed BaseHistoryProvider for persisting conversation history, along with unit tests and a runnable sample. This supports the Agent Framework’s pluggable “context/history provider” story similarly to existing integrations (e.g., Redis).

Changes:

  • Introduce agent-framework-azure-cosmos package with CosmosHistoryProvider (Cosmos DB transactional batch writes + session partitioning).
  • Add unit tests and package-local sample/README for the Cosmos history provider.
  • Wire the new package into the Python workspace (pyproject + uv.lock) and apply minor formatting cleanups in existing tests/modules.

Reviewed changes

Copilot reviewed 15 out of 16 changed files in this pull request and generated 7 comments.

Show a summary per file
File Description
python/uv.lock Adds the new workspace member and locks azure-cosmos dependency.
python/pyproject.toml Registers agent-framework-azure-cosmos as a uv workspace source.
python/packages/core/tests/core/test_skills.py Minor whitespace-only change.
python/packages/core/tests/core/test_function_invocation_logic.py Formatting of assertion messages (single-line f-strings).
python/packages/core/agent_framework/_skills.py Minor formatting/line-wrapping adjustments.
python/packages/azure-cosmos/pyproject.toml New package metadata, dependencies, and tooling config.
python/packages/azure-cosmos/agent_framework_azure_cosmos/_history_provider.py Implements CosmosHistoryProvider (get/save/clear/list + batching + container creation).
python/packages/azure-cosmos/agent_framework_azure_cosmos/init.py Exports CosmosHistoryProvider + version discovery.
python/packages/azure-cosmos/tests/test_cosmos_history_provider.py New unit test suite for the provider (mocked Cosmos client/container).
python/packages/azure-cosmos/samples/cosmos_history_provider.py Runnable sample demonstrating agent usage with Cosmos-backed history.
python/packages/azure-cosmos/samples/init.py Samples package marker + docstring.
python/packages/azure-cosmos/samples/README.md Sample runner documentation.
python/packages/azure-cosmos/README.md Package-level “getting started” documentation.
python/packages/azure-cosmos/LICENSE Package license file.
python/packages/azure-cosmos/AGENTS.md Package agent documentation (class + import path).
python/packages/azure-ai-search/tests/test_aisearch_context_provider.py Removes redundant local imports + minor formatting.

Comment on lines +171 to +172
query = "SELECT c.message FROM c WHERE c.session_id = @session_id ORDER BY c.sort_key ASC"
parameters: list[dict[str, object]] = [{"name": "@session_id", "value": session_key}]
Copy link

Copilot AI Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Documents include a source_id field, but reads/writes don’t use it: get_messages() queries only by session_id. If multiple history providers (or multiple source_ids) share the same container, histories will be mixed, and clear() may delete other providers’ data. Either remove source_id from stored documents, or (preferably) include it in the query filter (and in clear()/list_sessions() queries) so each provider instance is isolated.

Suggested change
query = "SELECT c.message FROM c WHERE c.session_id = @session_id ORDER BY c.sort_key ASC"
parameters: list[dict[str, object]] = [{"name": "@session_id", "value": session_key}]
query = (
"SELECT c.message FROM c "
"WHERE c.session_id = @session_id AND c.source_id = @source_id "
"ORDER BY c.sort_key ASC"
)
parameters: list[dict[str, object]] = [
{"name": "@session_id", "value": session_key},
{"name": "@source_id", "value": self.source_id},
]

Copilot uses AI. Check for mistakes.
Comment on lines +212 to +215
query = "SELECT c.id FROM c WHERE c.session_id = @session_id"
parameters: list[dict[str, object]] = [{"name": "@session_id", "value": session_key}]
items = container.query_items(query=query, parameters=parameters, partition_key=session_key)

Copy link

Copilot AI Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clear() deletes all items for the session partition without filtering by source_id. If multiple provider instances share a container, calling clear() on one instance can delete another instance’s history. Consider including source_id in the delete query (and/or using a provider-specific partition key scheme).

Copilot uses AI. Check for mistakes.
Comment on lines +229 to +230
query = "SELECT DISTINCT VALUE c.session_id FROM c"
items = container.query_items(query=query, enable_cross_partition_query=True)
Copy link

Copilot AI Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

list_sessions() currently lists session IDs across the entire container. If the container can contain data for multiple providers/apps, consider filtering by source_id (or documenting that the container must be dedicated to a single provider instance) to avoid returning unrelated sessions.

Suggested change
query = "SELECT DISTINCT VALUE c.session_id FROM c"
items = container.query_items(query=query, enable_cross_partition_query=True)
query = "SELECT DISTINCT VALUE c.session_id FROM c WHERE c.source_id = @source_id"
parameters: list[dict[str, object]] = [{"name": "@source_id", "value": self.source_id}]
items = container.query_items(
query=query,
parameters=parameters,
enable_cross_partition_query=True,
)

Copilot uses AI. Check for mistakes.
Comment on lines +58 to +89
credential = AzureCliCredential()
client = AzureOpenAIResponsesClient(
project_endpoint=project_endpoint,
deployment_name=deployment_name,
credential=credential,
)

# 3. Create an agent that uses the history provider as a context provider.
async with (
CosmosHistoryProvider(
endpoint=cosmos_endpoint,
database_name=cosmos_database_name,
container_name=cosmos_container_name,
credential=cosmos_key or credential,
) as history_provider,
client.as_agent(
name="CosmosHistoryAgent",
instructions="You are a helpful assistant that remembers prior turns.",
context_providers=[history_provider],
default_options={"store": False},
) as agent,
):
# 4. Create a session (session_id is used as the partition key).
session = agent.create_session()

# 5. Run a multi-turn conversation; history is persisted by CosmosHistoryProvider.
response1 = await agent.run("My name is Ada and I enjoy distributed systems.", session=session)
print(f"Assistant: {response1.text}")

response2 = await agent.run("What do you remember about me?", session=session)
print(f"Assistant: {response2.text}")
print(f"Container: {history_provider.container_name}")
Copy link

Copilot AI Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AzureCliCredential() (aio) is created here but never closed. Please ensure the sample disposes it properly (e.g., async with AzureCliCredential() as credential: or await credential.close() in a finally) to avoid leaking network resources during repeated runs.

Suggested change
credential = AzureCliCredential()
client = AzureOpenAIResponsesClient(
project_endpoint=project_endpoint,
deployment_name=deployment_name,
credential=credential,
)
# 3. Create an agent that uses the history provider as a context provider.
async with (
CosmosHistoryProvider(
endpoint=cosmos_endpoint,
database_name=cosmos_database_name,
container_name=cosmos_container_name,
credential=cosmos_key or credential,
) as history_provider,
client.as_agent(
name="CosmosHistoryAgent",
instructions="You are a helpful assistant that remembers prior turns.",
context_providers=[history_provider],
default_options={"store": False},
) as agent,
):
# 4. Create a session (session_id is used as the partition key).
session = agent.create_session()
# 5. Run a multi-turn conversation; history is persisted by CosmosHistoryProvider.
response1 = await agent.run("My name is Ada and I enjoy distributed systems.", session=session)
print(f"Assistant: {response1.text}")
response2 = await agent.run("What do you remember about me?", session=session)
print(f"Assistant: {response2.text}")
print(f"Container: {history_provider.container_name}")
async with AzureCliCredential() as credential:
client = AzureOpenAIResponsesClient(
project_endpoint=project_endpoint,
deployment_name=deployment_name,
credential=credential,
)
# 3. Create an agent that uses the history provider as a context provider.
async with (
CosmosHistoryProvider(
endpoint=cosmos_endpoint,
database_name=cosmos_database_name,
container_name=cosmos_container_name,
credential=cosmos_key or credential,
) as history_provider,
client.as_agent(
name="CosmosHistoryAgent",
instructions="You are a helpful assistant that remembers prior turns.",
context_providers=[history_provider],
default_options={"store": False},
) as agent,
):
# 4. Create a session (session_id is used as the partition key).
session = agent.create_session()
# 5. Run a multi-turn conversation; history is persisted by CosmosHistoryProvider.
response1 = await agent.run("My name is Ada and I enjoy distributed systems.", session=session)
print(f"Assistant: {response1.text}")
response2 = await agent.run("What do you remember about me?", session=session)
print(f"Assistant: {response2.text}")
print(f"Container: {history_provider.container_name}")

Copilot uses AI. Check for mistakes.
Comment on lines +13 to +17
"""
This sample demonstrates CosmosHistoryProvider as an agent context provider.

Key components:
- AzureOpenAIResponsesClient configured with an Azure AI project endpoint
Copy link

Copilot AI Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For consistency with existing samples, consider moving this descriptive triple-quoted block to come after the load_dotenv() call (rather than before it). This matches the structure used throughout python/samples/* and keeps environment loading in a consistent place.

Copilot uses AI. Check for mistakes.
Comment on lines +31 to +32
# Load environment variables from .env file.
load_dotenv()
Copy link

Copilot AI Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

load_dotenv() is called after the descriptive triple-quoted block; in existing samples the .env load happens immediately after imports. Consider moving load_dotenv() up so env vars are loaded before any sample configuration text/instructions are presented, matching python/samples/SAMPLE_GUIDELINES.md.

Copilot uses AI. Check for mistakes.
self._container: ContainerProxy | None = container_client
self._owns_client = False

if self._container is not None:
Copy link

Copilot AI Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When container_client is provided, __init__ returns early, so database_name / container_name attributes are never set (and any provided database_name/container_name args are ignored). This can lead to AttributeError for callers that log/introspect these properties. Consider setting these attributes even in the injected-container path, or clearly documenting that they may be unset when container_client is used.

Suggested change
if self._container is not None:
if self._container is not None:
# When a container_client is provided, we may still want database/container
# names for logging or introspection purposes. Use any explicitly supplied
# values; they may be None if not provided.
self.database_name = database_name
self.container_name = container_name

Copilot uses AI. Check for mistakes.
Copy link
Contributor

@moonbox3 moonbox3 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Automated Code Review

Reviewers: 3 | Confidence: 80%

✗ Correctness

The diff adds a new Azure Cosmos DB history provider package with solid implementation and tests, applies formatting-only changes to several existing files, and removes six local from agent_framework import Content imports from test methods in test_aisearch_context_provider.py. The removal of these local imports is suspicious: there is no corresponding addition of a module-level import for Content anywhere in the diff, which would cause NameError at runtime in all six affected test methods. The rest of the changes (new Cosmos package, formatting, pytest marker additions) are correct.

✓ Security Reliability

New Azure Cosmos DB history provider is generally well-structured: parameterized queries prevent injection, credential handling uses SecretString properly, and resource ownership is tracked for cleanup. The main concerns are: (1) a None session_id silently maps to a shared "default" partition, risking unintended cross-session data leakage; (2) partial batch failures in save_messages/clear are unhandled, leaving data in an inconsistent state; and (3) the __aexit__ can swallow the original exception if close() also raises. The remaining changes are formatting-only or test import hoisting and carry no risk.

✓ Test Coverage

The new CosmosHistoryProvider has solid test coverage for core CRUD operations (get, save, clear, list_sessions), initialization variants, lifecycle management (close, async context manager), and before/after run hooks. However, several non-trivial code paths lack tests: the batch-splitting logic when operations exceed _BATCH_OPERATION_LIMIT (100), the _session_partition_key fallback to 'default' when session_id is None, the _resolve_credential ValueError path when neither credential nor env key is provided, the RuntimeError in _get_container when database_client is None, and the filtering of non-dict message payloads in get_messages. The existing tests in test_aisearch_context_provider.py had only import-cleanup changes (moving Content import to module level) and formatting, which are fine.

Blocking Issues

  • Six test methods in test_aisearch_context_provider.py remove the local from agent_framework import Content import but no module-level import for Content is added in this diff, which will cause NameError: name 'Content' is not defined when running these tests.

Suggestions

  • Consider adding from agent_framework import Content as a module-level import at the top of test_aisearch_context_provider.py (alongside the existing imports) to replace the removed local imports.
  • Consider raising an error or generating a unique key when session_id is None in _session_partition_key rather than silently falling back to a shared "default" partition, which can leak messages across unrelated callers.
  • Add error handling around execute_item_batch in save_messages and clear so a partial batch failure doesn't silently leave data in an inconsistent state (e.g., log the error, raise, or retry the remaining batch).
  • Guard close() inside __aexit__ with a try/except so that a failure during cleanup doesn't mask the original exception that triggered the exit.
  • Add a test for batch splitting in save_messages and clear when the number of items exceeds _BATCH_OPERATION_LIMIT (100). This is a non-trivial loop that could silently lose data if broken.
  • Add a test for _session_partition_key returning 'default' when session_id is None — this is a meaningful behavioral contract that callers may depend on.
  • Add a test for _resolve_credential raising ValueError when both credential and settings key are None (the third branch in that method).
  • Add a test for _get_container raising RuntimeError when _database_client is None, to ensure the guard clause works correctly.
  • Add a test for get_messages skipping non-dict message payloads (e.g., a malformed item with a string or None 'message' field), since the code explicitly checks isinstance(message_payload, dict).
  • The after_run test (test_after_run_stores_input_and_response) only asserts on len(batch_operations) == 2 but doesn't verify the content of the stored messages (roles, text). Consider adding assertions on the actual message payloads to match the thoroughness of test_saves_messages.

Automated review by moonbox3's agents


def test_image_uri_content(self) -> None:
from agent_framework import Content

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line uses Content.from_uri(...) but the local from agent_framework import Content import was removed in this hunk and no module-level import for Content is visible in the diff. If Content is not already imported at the top of this file, all six affected test methods will fail with NameError. Please verify a module-level import exists, or add one.

async def get_messages(self, session_id: str | None, **kwargs: Any) -> list[Message]:
"""Retrieve stored messages for this session from Azure Cosmos DB."""
session_key = self._session_partition_key(session_id)
container = await self._get_container()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_session_partition_key returns a hard-coded "default" when session_id is None. All callers that omit a session ID will silently share one partition and see each other's messages. Consider raising a ValueError or generating a random key instead to prevent accidental cross-session data leakage.

Suggested change
container = await self._get_container()
@staticmethod
def _session_partition_key(session_id: str | None) -> str:
if session_id is None:
raise ValueError("session_id is required for CosmosHistoryProvider operations.")
return session_id


async def __aexit__(
self,
exc_type: type[BaseException] | None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

__aexit__ unconditionally awaits close(). If close() raises (e.g., network error closing the Cosmos client) while an exception is already propagating, the original exception is lost. Wrap in try/except to preserve the original error.

Suggested change
exc_type: type[BaseException] | None,
async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: Any,
) -> None:
"""Async context manager exit."""
try:
await self.close()
except Exception:
if exc_type is None:
raise

class TestCosmosHistoryProviderSaveMessages:
async def test_saves_messages(self, mock_container: MagicMock) -> None:
provider = CosmosHistoryProvider(source_id="mem", container_client=mock_container)
messages = [Message(role="user", contents=["Hello"]), Message(role="assistant", contents=["Hi"])]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding a test that calls save_messages with more than _BATCH_OPERATION_LIMIT (100) messages to verify the batching loop issues multiple execute_item_batch calls. This is a data-integrity-critical code path.

assert messages[0].text == "Hello"
assert messages[1].role == "assistant"
assert messages[1].text == "Hi"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No test covers get_messages(None) which exercises the _session_partition_key fallback to "default". This is an important behavioral contract worth explicitly verifying.

Suggested change
async def test_empty_returns_empty(self, mock_container: MagicMock) -> None:
mock_container.query_items.return_value = _to_async_iter([])
provider = CosmosHistoryProvider(source_id="mem", container_client=mock_container)
messages = await provider.get_messages("s1")
assert messages == []
async def test_none_session_id_uses_default_partition(self, mock_container: MagicMock) -> None:
mock_container.query_items.return_value = _to_async_iter([])
provider = CosmosHistoryProvider(source_id="mem", container_client=mock_container)
await provider.get_messages(None)
call_kwargs = mock_container.query_items.call_args.kwargs
assert call_kwargs["partition_key"] == "default"


raise ValueError(
"Azure Cosmos credential is required. Provide 'credential' or set 'AZURE_COSMOS_KEY' environment variable."
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The _resolve_credential ValueError branch (line 147-149) when neither credential nor settings key is provided has no corresponding test. Add a test that passes credential=None with no AZURE_COSMOS_KEY env var set to verify this error path.

async def test_after_run_stores_input_and_response(self, mock_container: MagicMock) -> None:
provider = CosmosHistoryProvider(source_id="mem", container_client=mock_container)
session = AgentSession(session_id="test")
context = SessionContext(input_messages=[Message(role="user", contents=["hi"])], session_id="s1")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test asserts len(batch_operations) == 2 but does not verify the actual message content. Consider also asserting on the roles and text of the stored messages to match the thoroughness of test_saves_messages.

Suggested change
context = SessionContext(input_messages=[Message(role="user", contents=["hi"])], session_id="s1")
mock_container.execute_item_batch.assert_awaited_once()
batch_operations = mock_container.execute_item_batch.await_args.kwargs["batch_operations"]
assert len(batch_operations) == 2
input_doc = batch_operations[0][1][0]
assert input_doc["message"]["role"] == "user"
response_doc = batch_operations[1][1][0]
assert response_doc["message"]["role"] == "assistant"

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

documentation Improvements or additions to documentation python

Projects

None yet

Development

Successfully merging this pull request may close these issues.

.NET: Python: Add support for Azure Cosmos DB as a chat message store and a provider

4 participants